RabbitMQ 学习笔记 您所在的位置:网站首页 rejected by concurrency RabbitMQ 学习笔记

RabbitMQ 学习笔记

2024-01-05 16:17| 来源: 网络整理| 查看: 265

重试机制 消费者抛出异常情况

模拟程序异常

@RabbitListener(queues = "work_queue") public void receiver(String msg) long deliveryTag) throws IOException { System.out.println("工作队列 work_queue 消费信息:" + msg + " -- " + LocalDateTime.now()); int i = 1/0; }

效果:一直循环消费并抛出异常

@RabbitHandler注解 底层使用Aop拦截,如果程序没有抛出异常,自动提交事务。如果Aop使用异常通知拦截获取到异常后,自动实现补偿机制,这个补偿机制的消息会缓存到 RabbitMQ 服务器端进行存放,一直重试到不抛出异常为止。

因为 spring.rabbitmq.listener.simple.acknowledge-mode 默认为 auto,该消息会一直缓存在 RabbitMQ 服务器端进行重放,所以在抛出异常后,默认将消息发送回队列,然后消费者继续消费该条消息,一直重试到不抛出异常为准。

spring.rabbitmq.listener.simple.default-requeue-rejected 默认为 true,当监听器抛出异常而拒绝的消息 会重新发回队列,false 表示不会放回队列。

如果将 spring.rabbitmq.listener.simple.acknowledge-mode 设置为 manual 话,因为设置了手动应答模式,所以消息会一直未应答,该线程一直占用这该条消息。

retry 的重试机制 消费者 @RabbitListener(queues = "work_queue") public void receiver(String msg) long deliveryTag) throws IOException { System.out.println("工作队列 work_queue 消费信息:" + msg + " -- " + LocalDateTime.now()); int i = 1/0; } yml spring: rabbitmq: listener: simple: retry: # 开启消费者重试 enabled: true # 最大重试次数 默认3次 max-attempts: 5 # 重试间隔次数 毫秒,默认1秒 initial-interval: 3000

如上,如果监听消息的方法抛出异常,消息会按照 listener.simple.retry 的配置进行重发 3 次,但是重发次数完了之后还抛出异常的话会调用 MessageRecoverer 的 recover 方法,其默认的实现为 RejectAndDontRequeueRecoverer,这个实现是将异常打印抛出并且发送 nack requeue=false 不会重新入队了,源码如下:

public class RejectAndDontRequeueRecoverer implements MessageRecoverer { protected Log logger = LogFactory.getLog(RejectAndDontRequeueRecoverer.class); // NOSONAR protected @Override public void recover(Message message, Throwable cause) { if (this.logger.isWarnEnabled()) { this.logger.warn("Retries exhausted for message " + message, cause); } throw new ListenerExecutionFailedException("Retry Policy Exhausted", new AmqpRejectAndDontRequeueException(cause), message); } }

在 work_queue 中发布一条消息后

工作队列 work_queue 消费信息:模拟一条异常的消息 -- 2020-03-26T12:45:43.361 工作队列 work_queue 消费信息:模拟一条异常的消息 -- 2020-03-26T12:45:46.361 工作队列 work_queue 消费信息:模拟一条异常的消息 -- 2020-03-26T12:45:49.363 工作队列 work_queue 消费信息:模拟一条异常的消息 -- 2020-03-26T12:45:52.363 工作队列 work_queue 消费信息:模拟一条异常的消息 -- 2020-03-26T12:45:55.363 2020-03-26 12:45:55.365 WARN 13900 --- [ntContainer#0-1] o.s.a.r.r.RejectAndDontRequeueRecoverer : Retries exhausted for message (Body:'[B@6ac95059(byte[27])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=, receivedRoutingKey=work_queue, deliveryTag=1, consumerTag=amq.ctag-B6klDdjiu3mdBeWRzltARA, consumerQueue=work_queue]) org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void XXX.receiver(java.lang.String,com.rabbitmq.client.Channel,long,java.util.Map) throws java.io.IOException' threw exception

我们可以看见消费者在消费信息时抛出异常后,RabbitMQ 每 3 秒进行重试,重试 5 次后就将该信息放弃,注意:这里的重试只是在当前线程里的重试,就是让线程休眠了,不是重回到 broke。

选择重试机制 消费者获取到消息后,调用第三方接口,但接口暂时无法访问,是否需要重试? 需要重试,可能是因为网络原因短暂不能访问 消费者获取到消息后,抛出数据转换异常,是否需要重试? 不需要重试,因为属于程序bug需要重新发布版本

对于第二种情况,如果是程序问题错误抛出的异常,不需要重试,重试也无济于事。可以采用 日志记录 + 定时任务 job 健康检查 + 人工进行补偿。

根据 retry 特性,我们可以用以下方案来配置重试机制

因为默认的 RepublishMessageRecoverer 会发送 nack 并且 requeue 为 false,所以给队列绑定死信交换机,在消费者异常到最大次数时,将信息发送到指定的死信队列中了。

使用 RepublishMessageRecoverer 这个 MessageRecoverer 会将原来的信息进行封装一下(如添加进异常信息)再发送消息到指定队列,这里看自我选择了。

@Bean public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate, "target_exchange", "routingKey"); }

注册自己实现的 MessageRecoverer (或 RejectAndDontRequeueRecoverer),在 recover 中写具体的操作,可以是保存至数据库,发邮件等操作。

@Bean public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) { return new MessageRecoverer() { @Override public void recover(Message message, Throwable cause) { System.out.println("messageRecoverer: " + JSON.toJSONString(message)); Map headers = message.getMessageProperties().getHeaders(); headers.put("x-exception-stacktrace", getStackTraceAsString(cause)); headers.put("x-exception-message", cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage()); headers.put("x-original-exchange", message.getMessageProperties().getReceivedExchange()); headers.put("x-original-routingKey", message.getMessageProperties().getReceivedRoutingKey()); rabbitTemplate.send(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), message); } }; } // 用来将异常栈的消息字符串显示 private String getStackTraceAsString(Throwable cause) { StringWriter stringWriter = new StringWriter(); PrintWriter printWriter = new PrintWriter(stringWriter, true); cause.printStackTrace(printWriter); return stringWriter.getBuffer().toString(); }

以上是模仿 RepublishMessageRecoverer 封装消息,然后发送回原来的交换机中的逻辑。

给MessageListenerContainer设置RecoveryCallback

对于方法手动捕获异常,进行处理

注意:以上方法都是默认 spring.rabbitmq.listener.simple.acknowledge-mode=auto 的配置,如果设置为 manual,原来的异常信息依然在消费者中处于 unacked 状态。此时执行了重新发送消息等操作的话,之前的异常信息还是存在的。

消息的幂等性

消费者在消费mq中的消息时,mq已把消息发送给消费者,消费者在给mq返回ack时网络中断,故mq未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息;

解决方法

使用全局 MessageID 判断消费方使用同一个,解决幂等性问题。 使用业务逻辑保证唯一(比如订单号码)

生产者 @Test public void demo_09_Producer() { String exchange = "rabbit_work_exchange"; String routingKey = "a"; String msg = "携带唯一标识的一条消息"; rabbitTemplate.convertAndSend(exchange, routingKey, msg, new CorrelationData(UUID.randomUUID().toString())); }

如上,生产者在发送消息时,给消息对象设置了唯一的 MessageID (CorrelationData 对象中设置),只有该 MessageID 没有被消费者标记方能在重试机制中再次被消费。

消费者 @RabbitListener(queues = "work_queue") public void receiver(@Payload String msg, @Headers Map headers) { System.out.println("工作队列 work_queue 消费信息:" + msg + " messageId: " + headers.get("spring_returned_message_correlation")); }

如上,通过 headers.get("spring_returned_message_correlation") (或*message.getMessageProperties().getMessageId() ) 获取 MessageID,获取的 MessageID 可以用来判断是否已经被消费者消费过了,如果已经消费则取消再次消费。

可以将该 ID 存至 redis 中,下次再调用时,先去 redis 判断是否存在该 ID 了,如果存在表明已经消费过了则直接返回,不再消费,否则消费,然后将记录存至 redis。

利用数据库主键去重



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有